{
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "_y1Uk2R61WqF"
      },
      "source": [
        "# End-to-end RAG using Elasticsearch and Cohere\n",
        "\n",
        "Learn how to use the [Inference API](https://www.elastic.co/guide/en/elasticsearch/reference/current/inference-apis.html) for semantic search and use [Cohere's APIs](https://docs.cohere.com/docs/the-cohere-platform) for RAG."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "kH7ioI4w3IF7"
      },
      "source": [
        "# 🧰 Requirements\n",
        "\n",
        "For this example, you will need:\n",
        "\n",
        "- An Elastic Serverless account through [Elastic Cloud](https://www.elastic.co/guide/en/cloud/current/ec-getting-started.html), available with a [free trial](https://cloud.elastic.co/registration?utm_source=github&utm_content=elasticsearch-labs-notebook)\n",
        "   \n",
        "- A [Cohere account](https://cohere.com/) with a production API key\n",
        "\n",
        "- Python 3.7 or higher\n",
        "\n",
        "Note: While this tutorial integrates Cohere with an Elastic Cloud serverless project, you can also integrate with your self-managed Elasticsearch deployment or Elastic Cloud deployment by simply switching from using a Serverless endpoint in the Elasticsearch client."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "GaEgtsBIj4uZ"
      },
      "source": [
        "# Create an Elastic Serverless deployment\n",
        "\n",
        "If you don't have an Elastic Cloud deployment, sign up [here](https://cloud.elastic.co/registration?utm_source=github&utm_content=elasticsearch-labs-notebook) for a free trial and request access to Elastic Serverless"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "Df7Q5EEi54-p"
      },
      "source": [
        "# Install packages and connect with Elasticsearch Serverless Client\n",
        "\n",
        "To get started, we'll need to connect to our Elastic Serverless deployment using the Python client.\n",
        "\n",
        "First we need to `pip` install the following packages:\n",
        "\n",
        "- `elasticsearch_serverless`\n",
        "- `cohere`\n",
        "\n",
        "After installing, in the Serverless dashboard, find your endpoint URL, and create your API key."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "dIyWMJqtjuoq"
      },
      "outputs": [],
      "source": [
        "pip install elasticsearch_serverless cohere"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "mUTDs8zrm9HV"
      },
      "source": [
        "Next, we need to import the modules we need. 🔐 NOTE: getpass enables us to securely prompt the user for credentials without echoing them to the terminal, or storing it in memory."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 22,
      "metadata": {
        "id": "eYJx2lx37Zu0"
      },
      "outputs": [],
      "source": [
        "from elasticsearch_serverless import Elasticsearch, helpers\n",
        "from getpass import getpass\n",
        "import cohere\n",
        "import json\n",
        "import requests"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "VQ0EPyNb7qvs"
      },
      "source": [
        "\n",
        "\n",
        "Now we can instantiate the Python Elasticsearch client.\n",
        "\n",
        "First we prompt the user for their endpoint and encoded API key.\n",
        "Then we create a `client` object that instantiates an instance of the `Elasticsearch` class.\n",
        "\n",
        "When creating your Elastic Serverless API key make sure to turn on Control security privileges, and edit cluster privileges to specify `\"cluster\": [\"all\"]`"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 3,
      "metadata": {
        "id": "HUjlUIu5m8fs"
      },
      "outputs": [],
      "source": [
        "ELASTICSEARCH_ENDPOINT = getpass(\"Elastic Endpoint: \")\n",
        "ELASTIC_API_KEY = getpass(\"Elastic encoded API key: \") # Use the encoded API key\n",
        "\n",
        "client = Elasticsearch(\n",
        "  ELASTICSEARCH_ENDPOINT,\n",
        "  api_key=ELASTIC_API_KEY\n",
        ")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "t_m42KWc8Ju6"
      },
      "source": [
        "Confirm that the client has connected with this test:"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "z6EMEbOH8LeE"
      },
      "outputs": [],
      "source": [
        "print(client.info())"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "SnV1Y4k8nxPI"
      },
      "source": [
        "## Create the inference endpoint\n",
        "\n",
        "Let's create the inference endpoint by using the [Create inference API](https://www.elastic.co/guide/en/elasticsearch/reference/current/put-inference-api.html).\n",
        "\n",
        "You'll need an Cohere API key for this that you can find in your Cohere account under the [API keys section](https://dashboard.cohere.com/api-keys). A production key is required to complete the steps in this notebook as the Cohere free trial API usage is limited.\n",
        "\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "G_7pbrkYALQJ"
      },
      "outputs": [],
      "source": [
        "COHERE_API_KEY = getpass(\"Enter Cohere API key:  \")\n",
        "\n",
        "# Delete the inference model if it already exists\n",
        "client.options(ignore_status=[404]).inference.delete_model(inference_id=\"cohere_embeddings\")\n",
        "\n",
        "client.inference.put_model(\n",
        "    task_type=\"text_embedding\",\n",
        "    inference_id=\"cohere_embeddings\",\n",
        "    body={\n",
        "        \"service\": \"cohere\",\n",
        "        \"service_settings\": {\n",
        "            \"api_key\": COHERE_API_KEY,\n",
        "            \"model_id\": \"embed-english-v3.0\",\n",
        "            \"embedding_type\": \"int8\",\n",
        "            \"similarity\": \"cosine\"\n",
        "        },\n",
        "        \"task_settings\": {},\n",
        "    },\n",
        ")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "dNv0AntHAVuR"
      },
      "source": [
        "## Create an ingest pipeline with an inference processor\n",
        "\n",
        "Create an ingest pipeline with an inference processor by using the [`put_pipeline`](https://www.elastic.co/guide/en/elasticsearch/reference/master/put-pipeline-api.html) method. Reference the inference endpoint created above as the `model_id` to infer against the data that is being ingested in the pipeline."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "4-9qsVISAZZM"
      },
      "outputs": [],
      "source": [
        "# Delete the inference pipeline if it already exists\n",
        "client.options(ignore_status=[404]).ingest.delete_pipeline(id=\"cohere_embeddings\")\n",
        "\n",
        "client.ingest.put_pipeline(\n",
        "    id=\"cohere_embeddings\",\n",
        "    description=\"Ingest pipeline for Cohere inference.\",\n",
        "    processors=[\n",
        "        {\n",
        "            \"inference\": {\n",
        "                \"model_id\": \"cohere_embeddings\",\n",
        "                \"input_output\": {\n",
        "                    \"input_field\": \"text\",\n",
        "                    \"output_field\": \"text_embedding\",\n",
        "                },\n",
        "            }\n",
        "        }\n",
        "    ],\n",
        ")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "NMZc9T40Acid"
      },
      "source": [
        "Let's note a few important parameters from that API call:\n",
        "\n",
        "- `inference`: A processor that performs inference using a machine learning model.\n",
        "- `model_id`: Specifies the ID of the inference endpoint to be used. In this example, the model ID is set to `cohere_embeddings`.\n",
        "- `input_output`: Specifies input and output fields.\n",
        "- `input_field`: Field name from which the `dense_vector` representation is created.\n",
        "- `output_field`:  Field name which contains inference results."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "W3sTZA8uAh7T"
      },
      "source": [
        "## Create index\n",
        "\n",
        "The mapping of the destination index – the index that contains the embeddings that the model will create based on your input text – must be created. The destination index must have a field with the [dense_vector](https://www.elastic.co/guide/en/elasticsearch/reference/current/dense-vector.html) field type to index the output of the Cohere model.\n",
        "\n",
        "Let's create an index named `cohere-wiki-embeddings` with the mappings we need."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "qVA-oip9AqOn"
      },
      "outputs": [],
      "source": [
        "client.indices.delete(index=\"cohere-wiki-embeddings\", ignore_unavailable=True)\n",
        "client.indices.create(\n",
        "    index=\"cohere-wiki-embeddings\",\n",
        "    settings={\"index\": {\"default_pipeline\": \"cohere_embeddings\"}},\n",
        "    mappings={\n",
        "        \"properties\": {\n",
        "            \"text_embedding\": {\n",
        "                \"type\": \"dense_vector\",\n",
        "                \"dims\": 1024,\n",
        "                \"element_type\": \"byte\"\n",
        "            },\n",
        "            \"text\": {\"type\": \"text\"},\n",
        "            \"wiki_id\": {\"type\": \"integer\"},\n",
        "            \"url\": {\"type\": \"text\"},\n",
        "            \"views\": {\"type\": \"float\"},\n",
        "            \"langs\": {\"type\": \"integer\"},\n",
        "            \"title\": {\"type\": \"text\"},\n",
        "            \"paragraph_id\": {\"type\": \"integer\"},\n",
        "            \"id\": {\"type\": \"integer\"}\n",
        "        }\n",
        "    },\n",
        ")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "PrMBgMAbBAUf"
      },
      "source": [
        "## Insert Documents\n",
        "\n",
        "Let's insert our example wiki dataset. You need a production Cohere account to complete this step, otherwise the documentation ingest will time out due to the API request rate limits."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "qZJjDmsiBGM1"
      },
      "outputs": [],
      "source": [
        "url = \"https://raw.githubusercontent.com/cohere-ai/notebooks/main/notebooks/data/embed_jobs_sample_data.jsonl\"\n",
        "response = requests.get(url)\n",
        "\n",
        "# Load the response data into a JSON object\n",
        "jsonl_data = response.content.decode('utf-8').splitlines()\n",
        "\n",
        "# Prepare the documents to be indexed\n",
        "documents = []\n",
        "for line in jsonl_data:\n",
        "    data_dict = json.loads(line)\n",
        "    documents.append({\n",
        "        \"_index\": \"cohere-wiki-embeddings\",\n",
        "        \"_source\": data_dict,\n",
        "        }\n",
        "      )\n",
        "\n",
        "# Use the bulk endpoint to index\n",
        "helpers.bulk(client, documents)\n",
        "\n",
        "print(\"Done indexing documents into `cohere-wiki-embeddings` index!\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "RZqvFbphC8mk"
      },
      "source": [
        "## Hybrid search\n",
        "\n",
        "After the dataset has been enriched with the embeddings, you can query the data using hybrid search.\n",
        "\n",
        "Pass a `query_vector_builder` to the k-nearest neighbor (kNN) vector search API, and provide the query text and the model you have used to create the embeddings."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "X7_rytHVDG9M"
      },
      "outputs": [],
      "source": [
        "query = \"When were the semi-finals of the 2022 FIFA world cup played?\"\n",
        "\n",
        "response = client.search(\n",
        "    index=\"cohere-wiki-embeddings\",\n",
        "    size=100,\n",
        "    knn={\n",
        "        \"field\": \"text_embedding\",\n",
        "        \"query_vector_builder\": {\n",
        "            \"text_embedding\": {\n",
        "                \"model_id\": \"cohere_embeddings\",\n",
        "                \"model_text\": query,\n",
        "            }\n",
        "        },\n",
        "        \"k\": 10,\n",
        "        \"num_candidates\": 50,\n",
        "    },\n",
        "    query={\n",
        "      \"multi_match\": {\n",
        "          \"query\": query,\n",
        "          \"fields\": [\"text\", \"title\"]\n",
        "        }\n",
        "      }\n",
        ")\n",
        "\n",
        "raw_documents = response[\"hits\"][\"hits\"]\n",
        "\n",
        "# Display the first 10 results\n",
        "for document in raw_documents[0:10]:\n",
        "  print(f'Title: {document[\"_source\"][\"title\"]}\\nText: {document[\"_source\"][\"text\"]}\\n')\n",
        "\n",
        "# Format the documents for ranking\n",
        "documents = []\n",
        "for hit in response[\"hits\"][\"hits\"]:\n",
        "    documents.append(hit[\"_source\"][\"text\"])"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "Bjac94OCE11n"
      },
      "source": [
        "## Ranking\n",
        "In order to effectively combine the results from our vector and BM25 retrieval, we can use Cohere's Rerank 3 model through the inference API to provide a final, more precise, semantic reranking of our results."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "9edNoqn4Ge_U"
      },
      "source": [
        "First, create an inference endpoint with your Cohere API key. Make sure to specify a name for your endpoint, and the model_id of one of the rerank models. In this example we will use Rerank 3."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "kqcetSgmgA-u"
      },
      "outputs": [],
      "source": [
        "# Delete the inference model if it already exists\n",
        "client.options(ignore_status=[404]).inference.delete_model(inference_id=\"cohere_rerank\")\n",
        "\n",
        "client.inference.put_model(\n",
        "    task_type=\"rerank\",\n",
        "    inference_id=\"cohere_rerank\",\n",
        "    body={\n",
        "        \"service\": \"cohere\",\n",
        "        \"service_settings\":{\n",
        "            \"api_key\": COHERE_API_KEY,\n",
        "            \"model_id\": \"rerank-english-v3.0\"\n",
        "           },\n",
        "        \"task_settings\": {\n",
        "            \"top_n\": 10,\n",
        "        },\n",
        "    }\n",
        ")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "VDPdDNejGs4d"
      },
      "source": [
        "You can now rerank your results using that inference endpoint. Here we will pass in the query we used for retrieval, along with the documents we just retrieved using hybrid search.\n",
        "\n",
        "The inference service will respond with a list of documents in descending order of relevance. Each document has a corresponding index (reflecting to the order the documents were in when sent to the inference endpoint), and if the “return_documents” task setting is True, then the document texts will be included as well.\n",
        "\n",
        "In this case we will set the response to False and will reconstruct the input documents based on the index returned in the response."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "Irn73NfjGjt7"
      },
      "outputs": [],
      "source": [
        "response = client.inference.inference(\n",
        "    inference_id=\"cohere_rerank\",\n",
        "    body={\n",
        "        \"query\": query,\n",
        "        \"input\": documents,\n",
        "        \"task_settings\": {\n",
        "            \"return_documents\": False\n",
        "            }\n",
        "        }\n",
        ")\n",
        "\n",
        "# Reconstruct the input documents based on the index provided in the rereank response\n",
        "ranked_documents = []\n",
        "for document in response.body[\"rerank\"]:\n",
        "  ranked_documents.append({\n",
        "      \"title\": raw_documents[int(document[\"index\"])][\"_source\"][\"title\"],\n",
        "      \"text\": raw_documents[int(document[\"index\"])][\"_source\"][\"text\"]\n",
        "  })\n",
        "\n",
        "# Print the top 10 results\n",
        "for document in ranked_documents[0:10]:\n",
        "  print(f\"Title: {document['title']}\\nText: {document['text']}\\n\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "LWvscYT7Lj67"
      },
      "source": [
        "# Retrieval augemented generation\n",
        "\n",
        "Now that we have ranked our results, we can easily turn this into a RAG system with Cohere's Chat API. Pass in the retrieved documents, along with the query and see the grounded response using Cohere's newest generative model Command R+."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "fPBvQCPKlNko"
      },
      "source": [
        "First, we will create the Cohere client."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "hJU-fPR5QKxg"
      },
      "outputs": [],
      "source": [
        "co = cohere.Client(COHERE_API_KEY)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "wRxJqil0QO0Y"
      },
      "source": [
        "Next, we can easily get a grounded generation with citations from the Cohere Chat API. We simply pass in the user query and documents retrieved from Elastic to the API, and print out our grounded response."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "Coj63Hyhle_T"
      },
      "outputs": [],
      "source": [
        "response = co.chat(\n",
        "    message=query,\n",
        "    documents=ranked_documents,\n",
        "    model='command-r-plus'\n",
        ")\n",
        "\n",
        "source_documents = []\n",
        "for citation in response.citations:\n",
        "  for document_id in citation.document_ids:\n",
        "    if document_id not in source_documents:\n",
        "      source_documents.append(document_id)\n",
        "\n",
        "print(f\"Query: {query}\")\n",
        "print(f\"Response: {response.text}\")\n",
        "print(\"Sources:\")\n",
        "for document in response.documents:\n",
        "  if document['id'] in source_documents:\n",
        "    print(f\"{document['title']}: {document['text']}\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "2t5R9b-vRz74"
      },
      "source": [
        "And there you have it! A quick and easy implementation of hybrid search and RAG with Cohere and Elastic."
      ]
    }
  ],
  "metadata": {
    "colab": {
      "provenance": []
    },
    "kernelspec": {
      "display_name": "Python 3",
      "name": "python3"
    },
    "language_info": {
      "name": "python"
    }
  },
  "nbformat": 4,
  "nbformat_minor": 0
}
